First we parse the connection details from the vcap.json file


In [5]:
import scala.io.Source
import play.api.libs.json._

val json = Json.parse(Source.fromFile("vcap.json").getLines().mkString)

val bootstrap_servers = (json \ "kafka_brokers_sasl").
   toString().
   replaceAll("\"", "").
   replaceAll("\\[", "").
   replaceAll("\\]", "")
val username = (json \ "user").toString().replace("\"", "")
val password = (json \ "password").toString().replace("\"", "")
val topic    = (json \ "topic").toString().replace("\"", "")

If the Message Hub consumer does not work, ensure your connection details are correct. Uncomment the code below to verify.


In [15]:
println("Connection details:")
println(bootstrap_servers)

{  
  /*
  println(username)
  println(password)
  println(topic)
  */
}


Connection details:
kafka05-prod01.messagehub.services.us-south.bluemix.net:9093,kafka03-prod01.messagehub.services.us-south.bluemix.net:9093,kafka04-prod01.messagehub.services.us-south.bluemix.net:9093,kafka01-prod01.messagehub.services.us-south.bluemix.net:9093,kafka02-prod01.messagehub.services.us-south.bluemix.net:9093

Next we connect to kafka. When you run the next cell, it will run indefinitely until you stop or interrupt the kernel.

After running the cell, open the Step 4 notebook in another tab to send some data to Message Hub and come back to the output under the cell below to see that the data is displayed by this consumer.

Note that it could take 60 seconds before the sent data is printed out by this consumer.


In [ ]:
import net.christophersnow.config.MessageHubConfig
import net.christophersnow.dstream.KafkaStreaming.KafkaStreamingContextAdapter

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.StringDeserializer

val kafkaProps = new MessageHubConfig

kafkaProps.setConfig("bootstrap.servers",   bootstrap_servers)
kafkaProps.setConfig("kafka.user.name",     username)
kafkaProps.setConfig("kafka.user.password", password)
kafkaProps.setConfig("kafka.topic",         topic)

kafkaProps.createConfiguration()

val ssc = new StreamingContext( sc, Seconds(60) )

val stream = ssc.createKafkaStream[String, String, StringDeserializer, StringDeserializer](
  kafkaProps,
  List(kafkaProps.getConfig("kafka.topic"))
  );

stream.foreachRDD{ rdd =>
  // we only want to create a folder in hdfs if we have some data
  if (rdd.count() > 0) {
    def uuid = java.util.UUID.randomUUID.toString
    val outDir = s"test-${uuid}"
    rdd.saveAsTextFile (outDir)
  }
}

stream.print()
ssc.start()
ssc.awaitTermination()

In [ ]: